/* --------------------------------------------------------------  */
/* (C)Copyright 2001,2007,                                         */
/* International Business Machines Corporation,                    */
/* Sony Computer Entertainment, Incorporated,                      */
/* Toshiba Corporation,                                            */
/*                                                                 */
/* All Rights Reserved.                                            */
/* --------------------------------------------------------------  */
/* PROLOG END TAG zYx                                              */
#include "../common.h"

#include <libspe2.h>
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/wait.h>
#include <string.h>
#include <math.h>
#include <stdbool.h>
#include <errno.h>
#include <sys/time.h>
#include <unistd.h>
#include <libmisc.h>

struct timezone tzone;
struct timeval time0, time1;
int sec, usec;

/* this is the pointer to the SPE code, to be used at thread creation time */
extern spe_program_handle_t svd_spu;

union
{
	arguments_union_t a;
	char bytes[128];
} args[NUM_SPE_MAX] __attribute__((aligned(128)));

typedef struct
{
	spe_context_ptr_t spe_context;
	void *argp;
	void *envp;
} thread_args_t;

pthread_t threads[NUM_SPE_MAX];
spe_context_ptr_t spe_contexts[NUM_SPE_MAX];
thread_args_t thread_args[NUM_SPE_MAX];
bool active_spe[NUM_SPE_MAX];

void *spe_thread( void *voidarg )
{
	thread_args_t *arg = (thread_args_t *)voidarg;

	unsigned int runflags = 0;
	unsigned int entry = SPE_DEFAULT_ENTRY;

	spe_context_run( arg->spe_context, &entry, runflags, arg->argp, arg->envp, NULL );
	pthread_exit( NULL );
}


void print_matrix(float_matrix_t matrix, size_t M, size_t N, size_t Nexpanded)
{
	size_t i, j;

	printf("matrix[%d][%d]:\n", M, N);

	for (i = 0; i < M; i++)
	{
		for (j = 0; j < N; j++)
		{
			if (MATRIX_ELEMENT(matrix, i, j, Nexpanded))
			{
				printf("matrix[%d][%d] = %g\n", i, j,
					MATRIX_ELEMENT(matrix, i, j, Nexpanded));
			}
		} 
	}
}

float inner_product(float_matrix_row_t row1, float_matrix_row_t row2, size_t N)
{
	size_t i;
	float innerprod = 0.0;

	for (i = 0; i < N; i++)
	{
		if (MATRIX_ROW_ELEMENT(row1, i) != 0.0 && MATRIX_ROW_ELEMENT(row2, i) != 0.0)
			innerprod +=  MATRIX_ROW_ELEMENT(row1, i) * MATRIX_ROW_ELEMENT(row2, i);
	}

	return innerprod;
}

// SPEs are required to send us a mailbox message before we can use them for work
// This message indicates whether the last iteration of the algorithm was convergent or not
// If the SPE hasn't processed any work yet, it will always assume convergence (it must explicitly detect otherwise)
bool assign_spe_job(const size_t lb_i, const size_t ub_i, const size_t lb_j,
	const size_t ub_j, const float delta, const size_t NUM_SPE)
{
	size_t target_spe = INT_MAX, i, i2;
	int retval;
	unsigned int msg, num_msgs;
	unsigned int converged = 1;
	static size_t firstTargetTried = 0;

	// Wait for a free SPE - we should have a message in their outbox if they're free
	do
		for (i = 0, i2 = firstTargetTried; i < NUM_SPE && target_spe == INT_MAX; i++, i2 = (i2 + 1) % NUM_SPE)
		//for (i = 0; i < NUM_SPE && target_spe == INT_MAX; i++)
		{
			// if the SPE isn't active, we can use it without waiting
			if (!active_spe[i2])
			{
				target_spe = i2;
			}
			else
			{
				num_msgs = spe_out_mbox_status(spe_contexts[i2]);
				//printf("spe %d has %d outgoing messages\n", i, num_msgs);
				if (num_msgs > 0)
				{
					target_spe = i2;
					retval = spe_out_mbox_read(spe_contexts[target_spe], &converged, 1);
					if (retval == -1)
					{
						printf("Problem reading from spe %d outbound mailbox %d: %s\n", target_spe, errno, strerror(errno));
					}
				}
			}
		}
	while (target_spe == INT_MAX);
	firstTargetTried = (firstTargetTried + 1) % NUM_SPE;

	//printf ("SPE: i=%d\n", i); fflush(stdout);

	// Mark this SPE active so we know to wait for it later
	active_spe[target_spe] = true;

	// set arguments
	args[target_spe].a.runargs.lb_i = lb_i;
	args[target_spe].a.runargs.ub_i = ub_i;
	args[target_spe].a.runargs.lb_j = lb_j;
	args[target_spe].a.runargs.ub_j = ub_j;
	args[target_spe].a.runargs.delta = delta;

	/*printf("SPE: i=%d %d %d %d %d %f %d %d %d %d %x\n", 
		target_spe, 
		args[target_spe].a.lb_i,
		args[target_spe].a.ub_i,
		args[target_spe].a.lb_j,
		args[target_spe].a.ub_j,
		args[target_spe].a.delta,
		M,
		N,
		Nexpanded,
		R,
		(int)args[target_spe].a.matrix);
	fflush(stdout);
*/
	msg = WORK_CHUNK_READY;
	retval = spe_in_mbox_write(spe_contexts[target_spe], &msg, 1, SPE_MBOX_ALL_BLOCKING);
	if (retval == -1)
	{
		printf("Problem sending to spe %d inbound mailbox %d: %s\n", target_spe, errno, strerror(errno));
	}
	//printf("Sent WORK_CHUNK_READY mail to spe %d\n", target_spe);

	return converged;
}

float calculateDelta(const size_t NUM_SPE)
{
	float sumdotprods = 0.0, delta;
	unsigned int i, msg;
	int retval;
	norm2reply_t * norm2replies[NUM_SPE];
	unsigned int done;
	
	for (i = 0; i < NUM_SPE; i++)
	{
		norm2replies[i] = malloc_align(sizeof(norm2reply_t), 7);
		args[i].a.normargs.startat = i;
		args[i].a.normargs.output = ((unsigned int) norm2replies[i]);
		msg = CALCULATE_NORMS2;
		retval = spe_in_mbox_write(spe_contexts[i], &msg, 1, SPE_MBOX_ALL_BLOCKING);
		if (retval == -1)
		{
			printf("Problem sending to spe %d inbound mailbox %d: %s\n", i, errno, strerror(errno));
		}
	}
	for (i = 0; i < NUM_SPE; i++)
	{
		while (!spe_out_mbox_status(spe_contexts[i]))
		{
			;
		}
		retval = spe_out_mbox_read(spe_contexts[i], &done, 1);
		if (retval == -1)
		{
			printf("Problem reading from spe %d outbound mailbox %d: %s\n", i, errno, strerror(errno));
		}
		sumdotprods += norm2replies[i]->f;
	}

	delta = EPSILON * sumdotprods;
	printf("delta = %f\n", delta);fflush(stdout);
	return delta;
}

size_t do_svd(const size_t R, const size_t M, const float delta, const size_t NUM_SPE)
{
	size_t i, diagonal, row_idx;
	bool converged = false;
	unsigned int temp_converged;
	size_t num_blocks;
	size_t lb_i, ub_i, lb_j, ub_j;
	size_t iterations = 0, num_msgs;
	int retval;
	unsigned int numConverged, numNotConverged;

	num_blocks = M / R + (M % R ? 1 : 0);
	printf("num_blocks = %u\n", num_blocks);fflush(stdout);

	// Mark all SPEs as inactive until we've started jobs on them
	for (i = 0; i < NUM_SPE; i++)
	{
		active_spe[i] = false;
	}

	do
	{
		iterations++;
		//printf("Starting iteration = %d\n", iterations);fflush(stdout);
		numConverged = numNotConverged = 0;
		converged = true;
		for (diagonal = 0; diagonal < num_blocks; diagonal++)
		{

			debug1("Processing diagonal %d of the work matrix\n", diagonal);
			for (row_idx = 0; row_idx < num_blocks - diagonal; row_idx++)
			{
				lb_i = row_idx * R;
				ub_i = (size_t) min(lb_i + R - 1, M - 1);
				lb_j = (row_idx + diagonal) * R;
				ub_j = (size_t) min(lb_j + R - 1, M - 1);
				
				// Find an available SPE, and assign it this job
				// Get the convergence status from the last job it processed
				temp_converged = assign_spe_job(lb_i, ub_i, lb_j, ub_j, delta, NUM_SPE);
				converged &= temp_converged;
			}

			// Wait for any remaining jobs in this diagonal, and get their convergence results
			for (i = 0; i < NUM_SPE; i++)
			{
				// Only wait for this SPE if it was used to process this diagonal
				if (active_spe[i])
				{
					//printf("Waiting for SPE %d to finish\n", i);

					do
					{
						num_msgs = spe_out_mbox_status(spe_contexts[i]);
					}
					while (num_msgs == 0);

					retval = spe_out_mbox_read(spe_contexts[i], &temp_converged, 1);
					if (retval == -1)
					{
						printf("Problem reading from spe %d outbound mailbox %d: %s\n", i, errno, strerror(errno));
					}

					converged &= temp_converged;
					active_spe[i] = false;
				}

			}
		}
	} while (converged != true);
	return iterations;
}

int load_matrix_from_file(float_matrix_t * pMatrix, size_t * M, size_t * N,
	size_t * Nexpanded, const char * filename)
{
	FILE * fp;
	char line[128];
	unsigned int i, uiRow;
	
	// Open file for reading
	if ((fp = fopen(filename, "r")) == NULL)
	{
		printf("Could not open %s for reading: %s\n", filename, strerror(errno));
		return -1;
	}

	// Read unimportant rows
	for (i = 0; i < 4; i++)
	{
		fgets(line, sizeof(line) / sizeof(char), fp);
	}
	// Read number of rows
	fgets(line, sizeof(line) / sizeof(char), fp);
	*M = atoi(&line[8]);
	// Read number of columns
	fgets(line, sizeof(line) / sizeof(char), fp);
	*N = atoi(&line[11]);
	*Nexpanded = ((*N) / 32 + ((*N) % 32 ? 1 : 0)) * 32;
	*pMatrix = malloc_align((*M) * (*Nexpanded) * sizeof(float), 7);
	memset(*pMatrix, 0, (*M) * (*Nexpanded) * sizeof(float));
	while(1)
	{
		// Save current position so we can come back later
		fpos_t mypos;
		size_t uiCols;
		fgetpos(fp, &mypos);
		i = 1;
		// If at eof, quit
		if (!fgets(line, sizeof(line) / sizeof(char), fp))
		{
			break;
		}
		// Get row number for current row
		uiRow = (unsigned int) atoi(line);
		// Get more rows until the row number is not the same or we run out of file
		while (1)
		{
			if (!fgets(line, sizeof(line) / sizeof(char), fp))
			{
				break;
			}
			if ((unsigned int) atoi(line) != uiRow)
			{
				break;
			}
			i++;
		}
		// Now we have the number of columns specified for the row
		uiCols = i;
		// Go back to the marker and read the row again
		fsetpos(fp, &mypos);
		for (i = 0; i < uiCols; i++)
		{
			char * pch;
			unsigned int uiCol;
			float fVal;
			fgets(line, sizeof(line) / sizeof(char), fp);
			pch = strtok(line, " ");
			uiCol = atoi(strtok(NULL, " "));
			fVal = atof(strtok(NULL, " "));
			MATRIX_ELEMENT(*pMatrix, uiRow, uiCol, *Nexpanded) = fVal;
		}
	}
	fclose(fp);
	return 0;
}

void clear_matrix(float_matrix_t * pMatrix)
{
	free_align(*pMatrix);
}

int main(int argc, char * argv[])
{
	size_t M, N, Nexpanded, R, NUM_SPE;
	size_t i = 0, j;
	unsigned int msg;
	int retval;
	float_matrix_t matrix;
	float delta;
	float * sigma;
	size_t iterations;

	if (argc != 4)
	{
		printf("usage: %s <R> <NUM_SPEs> <matrix file>\n", argv[0]);
		return -1;
	}

	R = atoi(argv[1]);
	NUM_SPE = atoi(argv[2]);
	if (NUM_SPE > NUM_SPE_MAX)
	{
		printf("NUM_SPEs must not be > %d\n", NUM_SPE_MAX);
		return -1;
	}
	if (load_matrix_from_file(&matrix, &M, &N, &Nexpanded, argv[3]))
	{
		printf("Error loading matrix, exiting\n");
		return -1;
	}
	
	//print_matrix(matrix, M, N, Nexpanded);

	printf("SVD, M = %d, N = %d, R = %d, SPEs = %d\n", M, N, R, NUM_SPE);  fflush(stdout);

	sigma = malloc(M * sizeof(float));

	gettimeofday( &time0, &tzone );


	// Spin up SPE threads to be used for this run
	printf("Spinning up %d SPE threads to be used for this calculation\n", NUM_SPE);
	for (i = 0; i < NUM_SPE; i++)
	{
		int retval;
		//printf ("SPE: i=%d\n", i); fflush(stdout);
		args[i].a.initargs.matrix = (unsigned long) matrix;
		args[i].a.initargs.Nexpanded = Nexpanded;
		args[i].a.initargs.R = R;
		args[i].a.initargs.M = M;
		args[i].a.initargs.NUM_SPE = NUM_SPE;

		spe_contexts[i] = spe_context_create(0, NULL); // (flags, gang)
		spe_program_load(spe_contexts[i], &svd_spu);

		thread_args[i].spe_context = spe_contexts[i];
		thread_args[i].argp = &args[i];
		thread_args[i].envp = NULL;

		if ((retval = pthread_create(&threads[i], NULL, &spe_thread, &thread_args[i])))
		{
			printf("Error %d in pthread_create\n", retval);
		}
	}
	// Wait until all their mailboxes are ready
	for (i = 0; i < NUM_SPE; i++)
	{
		unsigned int temp_converged;
		while (spe_out_mbox_status(spe_contexts[i]) <= 0)
		{
			;
		}
		spe_out_mbox_read(spe_contexts[i], &temp_converged, 1);
	}
	printf("Finished spinning up SPE threads\n");

	delta = calculateDelta(NUM_SPE);

	// *** Algorithm starts here ***
	iterations = do_svd(R, M, delta, NUM_SPE);
	// *** Algorithm ends here ***

	//print_matrix(matrix, M, N, Nexpanded);

	gettimeofday( &time1, &tzone );

	sec  = time1.tv_sec  - time0.tv_sec ;
	usec = time1.tv_usec - time0.tv_usec ;
	if ( usec < 0 ) { sec--; usec+=1000000 ; }


	printf("PPE: Done matrix[%d][%d]: time=%d.%06d iterations=%u\n", M, N, sec, usec, iterations);  fflush(stdout);
    for (i = 0; i < NUM_SPE; i++)
	{
		// Cleanup: signal SPE threads that there's no more work
		msg = NO_MORE_WORK;
		retval = spe_in_mbox_write(spe_contexts[i], &msg, 1, SPE_MBOX_ALL_BLOCKING);
		if (retval == -1)
		{
			printf("Problem sending to spe %d mailbox %d: %s\n", i, errno, strerror(errno));
		}
		printf("Sent NO_MORE_WORK mail to spe %d\n", i);
	}

	// Now find the sigmas
	// TBD: Should we return this to the caller? Or write it to disk? Should we return anything else?
	for (i = 0; i < M; i++)
	{
		sigma[i] = sqrt( inner_product(MATRIX_ROW(matrix, i, Nexpanded), MATRIX_ROW(matrix, i, Nexpanded), N));
	}
	for (i = 0; i < M; i++)
	{
		for (j = i + 1; j < M; j++)
		{
			if (sigma[i] < sigma[j])
			{
				delta = sigma[i];
				sigma[i] = sigma[j];
				sigma[j] = delta;
			} 
		}
	}
	for (i = 0; i < M; i++)
	{
		printf("sigma[%d] = %f\n", i, sigma[i]);
	}
	// Cleanup: wait for SPE threads to complete
	
	for (i = 0; i < NUM_SPE; i++)
	{
		pthread_join(threads[i], NULL);
	}

	clear_matrix(&matrix);
	free(sigma);
	return 0;
}
